package com.ray.woodencreate.mq;


import com.ray.woodencreate.exception.BusinessExceptionFactory;
import com.ray.woodencreate.logs.SystemLogBuilder;
import com.ray.woodencreate.result.MsgCodeConstant;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.util.ObjectUtils;

import java.util.*;

/**
 * @author bo shen
 * @Description: SwaggerConfig配置
 * @Class: SwaggerConfig
 * @Package com.toptoday.woodencreate.config
 * @date 2018/10/22 9:34
 * @company <p>Ray快速开发平台</p>
 * @updateRecord time(修改时间)  author(修改人)   desc(修改内容)
 */
@Data
@Slf4j
public class RocketConsumerConfig implements ApplicationContextAware {

    @Value("${rocketmq.consumer.namesrvAddr}")
    private String namesrvAddr;
    @Value("${rocketmq.consumer.groupname}")
    private String groupName;
    @Value("${rocketmq.consumer.consumeThreadMin}")
    private int consumeThreadMin;
    @Value("${rocketmq.consumer.consumeThreadMax}")
    private int consumeThreadMax;

    /***
     * 保存消费对象
     */
    private Map<String,List<Consumer>> customerMap = new HashMap<>();

    public DefaultMQPushConsumer registrtRocketMQConsumer(Set<String> topics){
        log.info("registrtRocketMQConsumer");
        if (StringUtils.isEmpty(groupName)){
            throw  BusinessExceptionFactory.newException(MsgCodeConstant.Error.ERR88000007,"groupName");
        }
        if (StringUtils.isEmpty(namesrvAddr)){
            throw  BusinessExceptionFactory.newException(MsgCodeConstant.Error.ERR88000007,"namesrvAddr");
        }
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeThreadMin(consumeThreadMin);
        consumer.setConsumeThreadMax(consumeThreadMax);
        consumer.setVipChannelEnabled(false);
        consumer.registerMessageListener(new MessageListener(new MessageDispatch(this)));
        try {
            for (String topic : topics) {
                log.info(new SystemLogBuilder().appendLevelTips().appendMsg("订阅消息TOPIC:"+ topic).bulidString());
                consumer.subscribe(topic,"*");
            }
            consumer.start();
        }catch (MQClientException e){
            throw  BusinessExceptionFactory.newException(MsgCodeConstant.Error.ERR88000008);
        }
        return consumer;
    }


    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        log.info("mq staring");
        //消费topic
        Set<String> topics = new HashSet<>();
        //消费对象
        Map<String, Object> beansMap = applicationContext.getBeansWithAnnotation(MqConsumer.class);
        for(Map.Entry entry : beansMap.entrySet()){
            if(!(entry.getValue() instanceof Consumer)){
                continue;
            }
            MqConsumer mqConsumer = (MqConsumer)(entry.getValue().getClass().getAnnotation(MqConsumer.class));
            if(StringUtils.isEmpty(mqConsumer.topic())){
                continue;
            }
            //添加topic
            if(!topics.contains(mqConsumer.topic())){
                topics.add(mqConsumer.topic());
            }
            //注册消费者
            registrtCustomer(mqConsumer.topic(),(Consumer)entry.getValue());
        }
        //注册
        registrtRocketMQConsumer(topics);
    }

    /**
     * 注册消费者
     * @param topic
     * @param customer
     */
    private void registrtCustomer(String topic, Consumer customer) {
        log.info("registrtCustomer");
        List<Consumer> list = customerMap.get(topic);
        if(ObjectUtils.isEmpty(list)){
            list = new ArrayList<>();
        }
        list.add(customer);
        customerMap.put(topic,list);
    }

    /**
     * 获取消费者
     * @param topic
     * @return
     */
    public List<Consumer> getCustomers(String topic) {
        return  customerMap.get(topic);
    }
}
